<!DOCTYPE html>



  


<html class="theme-next muse use-motion" lang="zh-Hans">
<head>
  <meta charset="UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"/>
<meta name="theme-color" content="#222">









<meta http-equiv="Cache-Control" content="no-transform" />
<meta http-equiv="Cache-Control" content="no-siteapp" />
















  
  
  <link href="/lib/fancybox/source/jquery.fancybox.css?v=2.1.5" rel="stylesheet" type="text/css" />







<link href="/lib/font-awesome/css/font-awesome.min.css?v=4.6.2" rel="stylesheet" type="text/css" />

<link href="/css/main.css?v=5.1.4" rel="stylesheet" type="text/css" />


  <link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="32x32" href="/images/favicon-32x32-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="16x16" href="/images/favicon-16x16-next.png?v=5.1.4">


  <link rel="mask-icon" href="/images/logo.svg?v=5.1.4" color="#222">





  <meta name="keywords" content="Hexo, NexT" />










<meta name="description" content="１、KafkaConsumer poll 详解消息拉起主要入口为：KafkaConsumer#poll方法，其声明如下： 123public ConsumerRecords&lt;K, V&gt; poll(final Duration timeout) &amp;#123;  &#x2F;&#x2F; @1    return poll(time.timer(timeout), true);">
<meta property="og:type" content="article">
<meta property="og:title" content="源码分析Kafka 消息拉取流程">
<meta property="og:url" content="https://www.codingw.net/posts/497923c7.html">
<meta property="og:site_name" content="中间件兴趣圈">
<meta property="og:description" content="１、KafkaConsumer poll 详解消息拉起主要入口为：KafkaConsumer#poll方法，其声明如下： 123public ConsumerRecords&lt;K, V&gt; poll(final Duration timeout) &amp;#123;  &#x2F;&#x2F; @1    return poll(time.timer(timeout), true);">
<meta property="og:locale">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191208193241248.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191208193319781.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/2019120819365575.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191208194302768.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/2019120819462738.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191208194637842.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191208194720402.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191208194726675.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="article:published_time" content="2020-10-22T15:19:01.000Z">
<meta property="article:modified_time" content="2021-04-26T12:10:59.739Z">
<meta property="article:author" content="中间件兴趣圈">
<meta property="article:tag" content="中间件">
<meta name="twitter:card" content="summary">
<meta name="twitter:image" content="https://img-blog.csdnimg.cn/20191208193241248.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">



<script type="text/javascript" id="hexo.configurations">
  var NexT = window.NexT || {};
  var CONFIG = {
    root: '',
    scheme: 'Muse',
    version: '5.1.4',
    sidebar: {"position":"left","display":"post","offset":12,"b2t":false,"scrollpercent":false,"onmobile":false},
    fancybox: true,
    tabs: true,
    motion: {"enable":true,"async":false,"transition":{"post_block":"fadeIn","post_header":"slideDownIn","post_body":"slideDownIn","coll_header":"slideLeftIn","sidebar":"slideUpIn"}},
    duoshuo: {
      userId: '0',
      author: '博主'
    },
    algolia: {
      applicationID: '',
      apiKey: '',
      indexName: '',
      hits: {"per_page":10},
      labels: {"input_placeholder":"Search for Posts","hits_empty":"We didn't find any results for the search: ${query}","hits_stats":"${hits} results found in ${time} ms"}
    }
  };
</script>



  <link rel="canonical" href="https://www.codingw.net/posts/497923c7.html"/>





  <title>源码分析Kafka 消息拉取流程 | 中间件兴趣圈</title>
  








<meta name="generator" content="Hexo 5.4.0"></head>

<body itemscope itemtype="http://schema.org/WebPage" lang="zh-Hans">

  
  
    
  

  <div class="container sidebar-position-left page-post-detail">
    <div class="headband"></div>

    <header id="header" class="header" itemscope itemtype="http://schema.org/WPHeader">
      <div class="header-inner"><div class="site-brand-wrapper">
  <div class="site-meta ">
    

    <div class="custom-logo-site-title">
      <a href="/"  class="brand" rel="start">
        <span class="logo-line-before"><i></i></span>
        <span class="site-title">中间件兴趣圈</span>
        <span class="logo-line-after"><i></i></span>
      </a>
    </div>
      
        <p class="site-subtitle">微信搜『中间件兴趣圈』，回复『Java』获取200本优质电子书</p>
      
  </div>

  <div class="site-nav-toggle">
    <button>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
    </button>
  </div>
</div>

<nav class="site-nav">
  

  
    <ul id="menu" class="menu">
      
        
        <li class="menu-item menu-item-home">
          <a href="/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-home"></i> <br />
            
            首页
          </a>
        </li>
      
        
        <li class="menu-item menu-item-categories">
          <a href="/categories/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            分类
          </a>
        </li>
      
        
        <li class="menu-item menu-item-archives">
          <a href="/archives/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            归档
          </a>
        </li>
      

      
    </ul>
  

  
</nav>



 </div>
    </header>

    <main id="main" class="main">
      <div class="main-inner">
        <div class="content-wrap">
          <div id="content" class="content">
            

  <div id="posts" class="posts-expand">
    

  

  
  
  

  <article class="post post-type-normal" itemscope itemtype="http://schema.org/Article">
  
  
  
  <div class="post-block">
    <link itemprop="mainEntityOfPage" href="https://www.codingw.net/posts/497923c7.html">

    <span hidden itemprop="author" itemscope itemtype="http://schema.org/Person">
      <meta itemprop="name" content="">
      <meta itemprop="description" content="">
      <meta itemprop="image" content="/images/avatar.gif">
    </span>

    <span hidden itemprop="publisher" itemscope itemtype="http://schema.org/Organization">
      <meta itemprop="name" content="中间件兴趣圈">
    </span>

    
      <header class="post-header">

        
        
          <h1 class="post-title" itemprop="name headline">源码分析Kafka 消息拉取流程</h1>
        

        <div class="post-meta">
          <span class="post-time">
            
              <span class="post-meta-item-icon">
                <i class="fa fa-calendar-o"></i>
              </span>
              
                <span class="post-meta-item-text">发表于</span>
              
              <time title="创建于" itemprop="dateCreated datePublished" datetime="2020-10-22T23:19:01+08:00">
                2020-10-22
              </time>
            

            

            
          </span>

          
            <span class="post-category" >
            
              <span class="post-meta-divider">|</span>
            
              <span class="post-meta-item-icon">
                <i class="fa fa-folder-o"></i>
              </span>
              
                <span class="post-meta-item-text">分类于</span>
              
              
                <span itemprop="about" itemscope itemtype="http://schema.org/Thing">
                  <a href="/categories/kafka/" itemprop="url" rel="index">
                    <span itemprop="name">kafka</span>
                  </a>
                </span>

                
                
              
            </span>
          

          
            
          

          
          
             <span id="/posts/497923c7.html" class="leancloud_visitors" data-flag-title="源码分析Kafka 消息拉取流程">
               <span class="post-meta-divider">|</span>
               <span class="post-meta-item-icon">
                 <i class="fa fa-eye"></i>
               </span>
               
                 <span class="post-meta-item-text">阅读次数&#58;</span>
               
                 <span class="leancloud-visitors-count"></span>
             </span>
          

          
            <span class="post-meta-divider">|</span>
            <span class="page-pv"><i class="fa fa-file-o"></i>
            <span class="busuanzi-value" id="busuanzi_value_page_pv" ></span>次
            </span>
          

          

          

        </div>
      </header>
    

    
    
    
    <div class="post-body" itemprop="articleBody">

      
      

      
        <div id="vip-container"><h2 id="１、KafkaConsumer-poll-详解"><a href="#１、KafkaConsumer-poll-详解" class="headerlink" title="１、KafkaConsumer poll 详解"></a>１、KafkaConsumer poll 详解</h2><p>消息拉起主要入口为：KafkaConsumer#poll方法，其声明如下：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> ConsumerRecords&lt;K, V&gt; <span class="title">poll</span><span class="params">(<span class="keyword">final</span> Duration timeout)</span> </span>&#123;  <span class="comment">// @1</span></span><br><span class="line">    <span class="keyword">return</span> poll(time.timer(timeout), <span class="keyword">true</span>);                                     <span class="comment">// @2</span></span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码＠１：参数为超时时间，使用 java 的 Duration 来定义。<br>代码＠２：调用内部的 poll 方法。</p>
<p>KafkaConsumer#poll</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> ConsumerRecords&lt;K, V&gt; <span class="title">poll</span><span class="params">(<span class="keyword">final</span> Timer timer, <span class="keyword">final</span> <span class="keyword">boolean</span> includeMetadataInTimeout)</span> </span>&#123;  <span class="comment">// @1</span></span><br><span class="line">    acquireAndEnsureOpen();                                                                                                               <span class="comment">// @2</span></span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        <span class="keyword">if</span> (<span class="keyword">this</span>.subscriptions.hasNoSubscriptionOrUserAssignment()) &#123;                                                  <span class="comment">// @3</span></span><br><span class="line">            <span class="keyword">throw</span> <span class="keyword">new</span> IllegalStateException(<span class="string">&quot;Consumer is not subscribed to any topics or assigned any partitions&quot;</span>);</span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">        <span class="comment">// poll for new data until the timeout expires</span></span><br><span class="line">        <span class="keyword">do</span> &#123;　　　　　　　　　　　　　　　　　　　　　　　　　　　　　　　　　　　　　　　<span class="comment">// @4</span></span><br><span class="line">            client.maybeTriggerWakeup();                                                                                               <span class="comment">//@5</span></span><br><span class="line"></span><br><span class="line">            <span class="keyword">if</span> (includeMetadataInTimeout) &#123;                       　　　　　　　　　　　　　　　　　　　 <span class="comment">// @6 　　　　　　　　　　　　　　　　　　　　                                                          </span></span><br><span class="line">                <span class="keyword">if</span> (!updateAssignmentMetadataIfNeeded(timer)) &#123;</span><br><span class="line">                    <span class="keyword">return</span> ConsumerRecords.empty();</span><br><span class="line">                &#125;</span><br><span class="line">            &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">                <span class="keyword">while</span> (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) &#123;        </span><br><span class="line">                    log.warn(<span class="string">&quot;Still waiting for metadata&quot;</span>);</span><br><span class="line">                &#125;</span><br><span class="line">            &#125;</span><br><span class="line"></span><br><span class="line">            <span class="keyword">final</span> Map&lt;TopicPartition, List&lt;ConsumerRecord&lt;K, V&gt;&gt;&gt; records = pollForFetches(timer);   <span class="comment">// @7</span></span><br><span class="line">            <span class="keyword">if</span> (!records.isEmpty()) &#123;                                                                                                           </span><br><span class="line">                <span class="keyword">if</span> (fetcher.sendFetches() &gt; <span class="number">0</span> || client.hasPendingRequests()) &#123;                                           <span class="comment">// @8</span></span><br><span class="line">                    client.pollNoWakeup();</span><br><span class="line">                &#125;</span><br><span class="line">                <span class="keyword">return</span> <span class="keyword">this</span>.interceptors.onConsume(<span class="keyword">new</span> ConsumerRecords&lt;&gt;(records));                         <span class="comment">// @９</span></span><br><span class="line">            &#125;</span><br><span class="line">        &#125; <span class="keyword">while</span> (timer.notExpired());                                                                                                         </span><br><span class="line"></span><br><span class="line">        <span class="keyword">return</span> ConsumerRecords.empty();</span><br><span class="line">    &#125; <span class="keyword">finally</span> &#123;</span><br><span class="line">        release();</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码＠１：首先先对其参数含义进行讲解。</p>
<ul>
<li>boolean includeMetadataInTimeout<br>拉取消息的超时时间是否包含更新元数据的时间，默认为true，即包含。</li>
</ul>
<p>代码＠２：检查是否可以拉取消息，其主要判断依据如下：</p>
<ul>
<li>KafkaConsumer 是否有其他线程再执行，如果有，则抛出异常，因为 - KafkaConsumer 是线程不安全的，同一时间只能一个线程执行。</li>
<li>KafkaConsumer 没有被关闭。</li>
</ul>
<p>代码＠３：如果当前消费者未订阅任何主题或者没有指定队列，则抛出错误，结束本次消息拉取。</p>
<p>代码＠４：使用 do while 结构循环拉取消息，直到超时或拉取到消息。</p>
<p>代码＠５：避免在禁止禁用wakeup时，有请求想唤醒时则抛出异常，例如在下面的@8时，会禁用wakeup。</p>
<p>代码＠６：更新相关元数据，为真正向 broker 发送消息拉取请求做好准备，该方法将在下面详细介绍，现在先简单介绍其核心实现点：</p>
<ul>
<li>如有必要，先向 broker 端拉取最新的订阅信息(包含消费组内的在线的消费客户端)。</li>
<li>执行已完成(异步提交)的 offset 提交请求的回调函数。</li>
<li>维护与 broker 端的心跳请求，确保不会被“踢出”消费组。</li>
<li>更新元信息。</li>
<li>如果是自动提交消费偏移量，则自动提交偏移量。</li>
<li>更新各个分区下次待拉取的偏移量。</li>
</ul>
<p>这里会有一个更新元数据是否占用消息拉取的超时时间，默认为 true。</p>
<p>代码＠７：调用 pollForFetches 向broker拉取消息，该方法将在下文详细介绍。</p>
<p>代码＠８：如果拉取到的消息集合不为空，再返回该批消息之前，如果还有挤压的拉取请求，可以继续发送拉取请求，但此时会禁用warkup，主要的目的是用户在处理消息时，KafkaConsumer 还可以继续向broker 拉取消息。</p>
<p>代码＠９：执行消费拦截器。</p>
<p>接下来对上文提到的代码＠６、＠７进行详细介绍。</p>
<span id="more"></span>

<h4 id="1-1-KafkaConsumer-updateAssignmentMetadataIfNeeded-详解"><a href="#1-1-KafkaConsumer-updateAssignmentMetadataIfNeeded-详解" class="headerlink" title="1.1 KafkaConsumer updateAssignmentMetadataIfNeeded 详解"></a>1.1 KafkaConsumer updateAssignmentMetadataIfNeeded 详解</h4><p>KafkaConsumer＃updateAssignmentMetadataIfNeeded</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">boolean</span> <span class="title">updateAssignmentMetadataIfNeeded</span><span class="params">(<span class="keyword">final</span> Timer timer)</span> </span>&#123;</span><br><span class="line">    <span class="keyword">if</span> (coordinator != <span class="keyword">null</span> &amp;&amp; !coordinator.poll(timer)) &#123;                            <span class="comment">// @1</span></span><br><span class="line">        <span class="keyword">return</span> <span class="keyword">false</span>;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> updateFetchPositions(timer);                                                  <span class="comment">// @2</span></span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>要理解这个方法实现的用途，我们就必须依次对 coordinator.poll 方法与 updateFetchPositions 方法。</p>
<h5 id="1-1-1-ConsumerCoordinator-poll"><a href="#1-1-1-ConsumerCoordinator-poll" class="headerlink" title="1.1.1 ConsumerCoordinator#poll"></a>1.1.1 ConsumerCoordinator#poll</h5><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">boolean</span> <span class="title">poll</span><span class="params">(Timer timer)</span> </span>&#123;</span><br><span class="line">    invokeCompletedOffsetCommitCallbacks();  <span class="comment">// @1</span></span><br><span class="line">    <span class="keyword">if</span> (subscriptions.partitionsAutoAssigned()) &#123;  <span class="comment">// @2</span></span><br><span class="line">        pollHeartbeat(timer.currentTimeMs());       <span class="comment">// @21</span></span><br><span class="line">        <span class="keyword">if</span> (coordinatorUnknown() &amp;&amp; !ensureCoordinatorReady(timer)) &#123;   <span class="comment">//@22</span></span><br><span class="line">            <span class="keyword">return</span> <span class="keyword">false</span>;</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">if</span> (rejoinNeededOrPending()) &#123;                                                       <span class="comment">// @23</span></span><br><span class="line">            <span class="keyword">if</span> (subscriptions.hasPatternSubscription()) &#123;                              <span class="comment">// @231</span></span><br><span class="line">                <span class="keyword">if</span> (<span class="keyword">this</span>.metadata.timeToAllowUpdate(time.milliseconds()) == <span class="number">0</span>) &#123;  </span><br><span class="line">                    <span class="keyword">this</span>.metadata.requestUpdate();</span><br><span class="line">                &#125;</span><br><span class="line">                <span class="keyword">if</span> (!client.ensureFreshMetadata(timer)) &#123;                                  </span><br><span class="line">                    <span class="keyword">return</span> <span class="keyword">false</span>;</span><br><span class="line">                &#125;</span><br><span class="line">            &#125;</span><br><span class="line">            <span class="keyword">if</span> (!ensureActiveGroup(timer)) &#123;                                                <span class="comment">// @232</span></span><br><span class="line">                <span class="keyword">return</span> <span class="keyword">false</span>;</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125; <span class="keyword">else</span> &#123;                                                            <span class="comment">// @3</span></span><br><span class="line">        <span class="keyword">if</span> (metadata.updateRequested() &amp;&amp; !client.hasReadyNodes(timer.currentTimeMs())) &#123;</span><br><span class="line">            client.awaitMetadataUpdate(timer);</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">    maybeAutoCommitOffsetsAsync(timer.currentTimeMs());   <span class="comment">// @4</span></span><br><span class="line">    <span class="keyword">return</span> <span class="keyword">true</span>;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码＠1：执行已完成的 offset (消费进度)提交请求的回调函数。</p>
<p>代码@2：队列负载算法为自动分配（即 Kafka 根据消费者个数与分区书动态负载分区）的相关的处理逻辑。其实现关键点如下：</p>
<ul>
<li>代码@21：更新发送心跳相关的时间，例如heartbeatTimer、sessionTimer、pollTimer 分别代表发送最新发送心跳的时间、会话最新活跃时间、最新拉取消息。</li>
<li>代码@22：如果不存在协调器或协调器已断开连接，则返回 false，结束本次拉取。如果协调器就绪，则继续往下走。</li>
<li>代码@23：判断是否需要触发重平衡，即消费组内的所有消费者重新分配topic中的分区信息，例如元数据发送变化，判断是否需要重新重平衡的关键点如下：<ul>
<li>如果队列负载是通过用户指定的，则返回 false，表示无需重平衡。</li>
<li>如果队列是自动负载，topic 队列元数据发生了变化，则需要重平衡。</li>
<li>如果队列是自动负载，订阅关系发生了变化，则需要重平衡。<br>如果需要重重平衡，则同步更新元数据，此过程会阻塞。详细的重平衡将单独重点介绍，这里暂时不深入展开。</li>
</ul>
</li>
</ul>
<p>代码@3：用户手动为消费组指定负载的队列的相关处理逻辑，其实现关键如下：</p>
<ul>
<li>如果需要更新元数据，并且还没有分区准备好，则同步阻塞等待元数据更新完毕。</li>
</ul>
<p>代码@4：如果开启了自动提交消费进度，并且已到下一次提交时间，则提交。Kafka 消费者可以通过设置属性 enable.auto.commit 来开启自动提交，该参数默认为 true，则默认会每隔 5s 提交一次消费进度，提交间隔可以通过参数 auto.commit.interval.ms 设置。</p>
<p>接下来继续探讨 updateAssignmentMetadataIfNeeded (更新元数据)的第二个步骤，更新拉取位移。</p>
<h5 id="1-1-2-updateFetchPositions-详解"><a href="#1-1-2-updateFetchPositions-详解" class="headerlink" title="1.1.2 updateFetchPositions 详解"></a>1.1.2 updateFetchPositions 详解</h5><p>KafkaConsumer#updateFetchPositions</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">boolean</span> <span class="title">updateFetchPositions</span><span class="params">(<span class="keyword">final</span> Timer timer)</span> </span>&#123;</span><br><span class="line">    cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions();  </span><br><span class="line">    <span class="keyword">if</span> (cachedSubscriptionHashAllFetchPositions) &#123;           <span class="comment">// @1</span></span><br><span class="line">        <span class="keyword">return</span> <span class="keyword">true</span>;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">if</span> (coordinator != <span class="keyword">null</span> &amp;&amp; !coordinator.refreshCommittedOffsetsIfNeeded(timer))   <span class="comment">// @2</span></span><br><span class="line">        <span class="keyword">return</span> <span class="keyword">false</span>;</span><br><span class="line">    subscriptions.resetMissingPositions();                         <span class="comment">// @3</span></span><br><span class="line">    fetcher.resetOffsetsIfNeeded();                                    <span class="comment">// @4</span></span><br><span class="line">    <span class="keyword">return</span> <span class="keyword">true</span>;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：如果订阅关系中的所有分区都有有效的位移，则返回 true。</p>
<p>代码@2：如果存在任意一个分区没有有效的位移信息，则需要向 broker 发送请求，从broker 获取该消费组，该分区的消费进度。相关的实现细节将在后续文章【Kafka 消费进度】专题文章中详细介绍。</p>
<p>代码@3：如果经过第二步，订阅关系中还某些分区还是没有获取到有效的偏移量，则使用偏移量重置策略进行重置，如果未配置，则抛出异常。</p>
<p>代码@4：发送一个异步请求去重置那些正等待重置位置的分区。有关 Kafka 消费消费进度、重平衡等知识将会在后续文章中深入探讨，本文只需了解 poll 消息的核心处理流程。</p>
<p>从 KafkaConsumer#poll 中流程可以看到，通过 updateAssignmentMetadataIfNeeded 对元数据、重平衡，更新拉取偏移量等工作处理完成后，下一步就是需要向 broker 拉取消息了，其实现入口为：KafkaConsumer 的 pollForFetches 方法。</p>
<h4 id="1-2-消息拉取"><a href="#1-2-消息拉取" class="headerlink" title="1.2 消息拉取"></a>1.2 消息拉取</h4><p>KafkaConsumer#pollForFetches</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">private</span> Map&lt;TopicPartition, List&lt;ConsumerRecord&lt;K, V&gt;&gt;&gt; pollForFetches(Timer timer) &#123;</span><br><span class="line">        <span class="keyword">long</span> pollTimeout = coordinator == <span class="keyword">null</span> ? timer.remainingMs() :</span><br><span class="line">                Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());   <span class="comment">// @1</span></span><br><span class="line">        <span class="comment">// if data is available already, return it immediately</span></span><br><span class="line">        <span class="keyword">final</span> Map&lt;TopicPartition, List&lt;ConsumerRecord&lt;K, V&gt;&gt;&gt; records = fetcher.fetchedRecords();    <span class="comment">// @2</span></span><br><span class="line">        <span class="keyword">if</span> (!records.isEmpty()) &#123;</span><br><span class="line">            <span class="keyword">return</span> records;</span><br><span class="line">        &#125;</span><br><span class="line">        fetcher.sendFetches();                               <span class="comment">// @3</span></span><br><span class="line">        <span class="comment">// We do not want to be stuck blocking in poll if we are missing some positions</span></span><br><span class="line">        <span class="comment">// since the offset lookup may be backing off after a failure</span></span><br><span class="line">        <span class="comment">// <span class="doctag">NOTE:</span> the use of cachedSubscriptionHashAllFetchPositions means we MUST call</span></span><br><span class="line">        <span class="comment">// updateAssignmentMetadataIfNeeded before this method.</span></span><br><span class="line">        <span class="keyword">if</span> (!cachedSubscriptionHashAllFetchPositions &amp;&amp; pollTimeout &gt; retryBackoffMs) &#123;   <span class="comment">// @4</span></span><br><span class="line">            pollTimeout = retryBackoffMs;</span><br><span class="line">        &#125;</span><br><span class="line">        Timer pollTimer = time.timer(pollTimeout);</span><br><span class="line">        client.poll(pollTimer, () -&gt; &#123;</span><br><span class="line">            <span class="keyword">return</span> !fetcher.hasCompletedFetches();</span><br><span class="line">        &#125;);         <span class="comment">// @5</span></span><br><span class="line">        timer.update(pollTimer.currentTimeMs());   <span class="comment">// @6</span></span><br><span class="line">        <span class="keyword">if</span> (coordinator != <span class="keyword">null</span> &amp;&amp; coordinator.rejoinNeededOrPending()) &#123;  <span class="comment">// @7</span></span><br><span class="line">            <span class="keyword">return</span> Collections.emptyMap();</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">return</span> fetcher.fetchedRecords();   <span class="comment">// @8</span></span><br><span class="line">    &#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：计算本次拉取的超时时间，其计算逻辑如下：</p>
<ul>
<li>如果协调器为空，则返回当前定时器剩余时间即可。</li>
<li>如果协调器不为空，其逻辑较为复杂，为下面返回的超时间与当前定时器剩余时间相比取最小值。</li>
<li>如果不开启自动提交位移并且未加入消费组，则超时时间为Long.MAX_VALUE。</li>
<li>如果不开启自动提交位移并且已加入消费组，则返回距离下一次发送心跳包还剩多少时间。</li>
<li>如果开启自动提交位移，则返回 距离下一次自动提交位移所需时间 与 距离下一次发送心跳包所需时间 之间的最小值。</li>
</ul>
<p>代码@2：如果数据已经拉回到本地，直接返回数据。将在下文详细介绍 Fetcher 的 fetchedRecords 方法。</p>
<p>代码@3:组装发送请求，并将存储在待发送请求列表中。</p>
<p>代码@4：如果已缓存的分区信息中存在某些分区缺少偏移量，如果拉取的超时时间大于失败重试需要阻塞的时间，则更新此次拉取的超时时间为失败重试需要的间隔时间，主要的目的是不希望在 poll 过程中被阻塞【后续会详细介绍 Kafka 拉取消息的线程模型，再来回顾一下这里】。</p>
<p>代码@5：通过调用NetworkClient 的 poll 方法发起消息拉取操作（触发网络读写）。</p>
<p>代码@6：更新本次拉取的时间。</p>
<p>代码@7：检查是需要重平衡。</p>
<p>代码@8：将从 broker 读取到的数据返回（即封装成消息）。</p>
<p>从上面消息拉取流程来看，有几个比较重要的方法，例如 Fetcher 类相关的方法，NetworkClient 的 poll 方法，那我们接下来来重点探讨。</p>
<p>我们先用一张流程图总结一下消息拉取的全过程：<br><img src="https://img-blog.csdnimg.cn/20191208193241248.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>接下来我们将重点看一下 KafkaConsumer 的 pollForFetches 详细过程，也就是需要详细探究 Fetcher 类的实现细节。</p>
<h2 id="2、Fetcher-类详解"><a href="#2、Fetcher-类详解" class="headerlink" title="2、Fetcher 类详解"></a>2、Fetcher 类详解</h2><p>Fetcher 封装消息拉取的方法，可以看成是消息拉取的门面类。</p>
<h4 id="2-1-类图"><a href="#2-1-类图" class="headerlink" title="2.1 类图"></a>2.1 类图</h4><p><img src="https://img-blog.csdnimg.cn/20191208193319781.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>我们首先一一介绍一下 Fetcher 的核心属性与核心方法。</p>
<ul>
<li>ConsumerNetworkClient client<br>消费端网络客户端，Kafka 负责网络通讯实现类。</li>
<li>int minBytes<br>一次消息拉取需要拉取的最小字节数，如果不组，会阻塞，默认值为1字节，如果增大这个值会增大吞吐，但会增加延迟，可以通参数 fetch.min.bytes 改变其默认值。</li>
<li>int maxBytes<br>一次消息拉取允许拉取的最大字节数，但这不是绝对的，如果一个分区的第一批记录超过了该值，也会返回。默认为50M,可通过参数 fetch.max.bytes 改变其默认值。同时不能超过 broker的配置参数(message.max.bytes) 和 主题级别的配置(max.message.bytes)。</li>
<li>int maxWaitMs<br>在 broker 如果符合拉取条件的数据小于 minBytes 时阻塞的时间，默认为 500ms ，可通属性 fetch.max.wait.ms 进行定制。</li>
<li>int fetchSize<br>每一个分区返回的最大消息字节数，如果分区中的第一批消息大于 fetchSize 也会返回。</li>
<li>long retryBackoffMs<br>失败重试后需要阻塞的时间，默认为 100 ms，可通过参数 retry.backoff.ms 定制。</li>
<li>long requestTimeoutMs<br>客户端向 broker 发送请求最大的超时时间，默认为 30s，可以通过 request.timeout.ms 参数定制。</li>
<li>int maxPollRecords<br>单次拉取返回的最大记录数，默认值 500，可通过参数 max.poll.records 进行定制。</li>
<li>boolean checkCrcs<br>是否检查消息的 crcs 校验和，默认为 true，可通过参数 check.crcs 进行定制。</li>
<li>Metadata metadata<br>元数据。</li>
<li>FetchManagerMetrics sensors<br>消息拉取的统计服务类。</li>
<li>SubscriptionState subscriptions<br>订阅信息状态。</li>
<li>ConcurrentLinkedQueue&lt; CompletedFetch&gt; completedFetches<br>已完成的 Fetch 的请求结果，待消费端从中取出数据。</li>
<li>Deserializer&lt; K&gt; keyDeserializer<br>key 的反序列化器。</li>
<li>Deserializer&lt; V&gt; valueDeserializer<br>value 的饭序列化器。</li>
<li>IsolationLevel isolationLevel<br>Kafka的隔离级别（与事务消息相关），后续在研究其事务相关时再进行探讨。</li>
<li>Map&lt;Integer, FetchSessionHandler&gt; sessionHandlers<br>拉取会话监听器。</li>
</ul>
<p>接下来我们将按照消息流程，一起来看一下 Fetcher 的核心方法。</p>
<h4 id="2-2-Fetcher-核心方法"><a href="#2-2-Fetcher-核心方法" class="headerlink" title="2.2 Fetcher 核心方法"></a>2.2 Fetcher 核心方法</h4><h5 id="2-2-1-Fetcher-fetchedRecords"><a href="#2-2-1-Fetcher-fetchedRecords" class="headerlink" title="2.2.1 Fetcher#fetchedRecords"></a>2.2.1 Fetcher#fetchedRecords</h5><p>Fetcher#fetchedRecords</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">public</span> Map&lt;TopicPartition, List&lt;ConsumerRecord&lt;K, V&gt;&gt;&gt; fetchedRecords() &#123;</span><br><span class="line">    Map&lt;TopicPartition, List&lt;ConsumerRecord&lt;K, V&gt;&gt;&gt; fetched = <span class="keyword">new</span> HashMap&lt;&gt;();   <span class="comment">// @1</span></span><br><span class="line">    <span class="keyword">int</span> recordsRemaining = maxPollRecords;                                                              </span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        <span class="keyword">while</span> (recordsRemaining &gt; <span class="number">0</span>) &#123;                                                                                  <span class="comment">// @2</span></span><br><span class="line">            <span class="keyword">if</span> (nextInLineRecords == <span class="keyword">null</span> || nextInLineRecords.isFetched) &#123;                           <span class="comment">// @3</span></span><br><span class="line">                CompletedFetch completedFetch = completedFetches.peek();</span><br><span class="line">                <span class="keyword">if</span> (completedFetch == <span class="keyword">null</span>) <span class="keyword">break</span>;</span><br><span class="line">                <span class="keyword">try</span> &#123;</span><br><span class="line">                    nextInLineRecords = parseCompletedFetch(completedFetch);</span><br><span class="line">                &#125; <span class="keyword">catch</span> (Exception e) &#123;</span><br><span class="line">                    FetchResponse.PartitionData partition = completedFetch.partitionData;</span><br><span class="line">                    <span class="keyword">if</span> (fetched.isEmpty() &amp;&amp; (partition.records == <span class="keyword">null</span> || partition.records.sizeInBytes() == <span class="number">0</span>)) &#123;</span><br><span class="line">                        completedFetches.poll();</span><br><span class="line">                    &#125;</span><br><span class="line">                    <span class="keyword">throw</span> e;</span><br><span class="line">                &#125;</span><br><span class="line">                completedFetches.poll();</span><br><span class="line">             &#125; <span class="keyword">else</span> &#123;                                                                                                                         <span class="comment">// @4</span></span><br><span class="line">                List&lt;ConsumerRecord&lt;K, V&gt;&gt; records = fetchRecords(nextInLineRecords, recordsRemaining);</span><br><span class="line">                TopicPartition partition = nextInLineRecords.partition;</span><br><span class="line">                <span class="keyword">if</span> (!records.isEmpty()) &#123;</span><br><span class="line">                    List&lt;ConsumerRecord&lt;K, V&gt;&gt; currentRecords = fetched.get(partition);</span><br><span class="line">                    <span class="keyword">if</span> (currentRecords == <span class="keyword">null</span>) &#123;</span><br><span class="line">                        fetched.put(partition, records);</span><br><span class="line">                    &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">                        List&lt;ConsumerRecord&lt;K, V&gt;&gt; newRecords = <span class="keyword">new</span> ArrayList&lt;&gt;(records.size() + currentRecords.size());</span><br><span class="line">                        newRecords.addAll(currentRecords);</span><br><span class="line">                        newRecords.addAll(records);</span><br><span class="line">                        fetched.put(partition, newRecords);</span><br><span class="line">                    &#125;</span><br><span class="line">                    recordsRemaining -= records.size();</span><br><span class="line">                &#125;</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125; <span class="keyword">catch</span> (KafkaException e) &#123;</span><br><span class="line">        <span class="keyword">if</span> (fetched.isEmpty())</span><br><span class="line">            <span class="keyword">throw</span> e;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> fetched;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：首先先解释两个局部变量的含义：</p>
<ul>
<li>Map&lt;TopicPartition, List&lt;ConsumerRecord&lt;K, V&gt;&gt;&gt; fetched 按分区存放已拉取的消息，返回给客户端进行处理。</li>
<li>recordsRemaining：剩余可拉取的消息条数。</li>
</ul>
<p>代码@2：循环去取已经完成了 Fetch 请求的消息，该 while 循环有两个跳出条件：</p>
<ul>
<li>如果拉取的消息已经达到一次拉取的最大消息条数，则跳出循环。</li>
<li>缓存中所有拉取结果已处理。</li>
</ul>
<p>代码@3、@4 主要完成从缓存中解析数据的两个步骤，初次运行的时候，会进入分支@3，然后从 调用 parseCompletedFetch 解析成 PartitionRecords 对象，然后代码@4的职责就是从解析 PartitionRecords ，将消息封装成 ConsumerRecord，返回给消费端线程处理。</p>
<p>代码@3的实现要点如下：</p>
<ul>
<li>首先从 completedFetches (Fetch请求的返回结果) 列表中获取一个 Fetcher 请求，主要使用的 Queue 的 peek()方法，并不会从该队列中移除该元素。</li>
<li>然后调用 parseCompletedFetch 对处理结果进行解析返回 PartitionRecords。</li>
<li>处理成功后，调用 Queue 的方法将已处理过的 Fetcher结果移除。</li>
</ul>
<p>从上面可知，上述方法的核心方法是：parseCompletedFetch。</p>
<p>代码@4的实现要点无非就是调用 fetchRecords 方法，按分区组装成 Map&lt;TopicPartition, List&lt;ConsumerRecord&lt;K, V&gt;&gt;&gt;，供消费者处理，例如供业务处理。</p>
<p>接下来将重点探讨上述两个方法的实现细节。</p>
<h6 id="2-2-1-1-Fetcher-parseCompletedFetch"><a href="#2-2-1-1-Fetcher-parseCompletedFetch" class="headerlink" title="2.2.1.1 Fetcher#parseCompletedFetch"></a>2.2.1.1 Fetcher#parseCompletedFetch</h6><p>在尝试探讨该方法之前，我们首先对其入参进行一个梳理，特别是先认识其主要数据结构。</p>
<p>1、CompletedFetch 相关类图<br><img src="https://img-blog.csdnimg.cn/2019120819365575.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>从上图可以看出，CompleteFetch 核心属性主要如下：</p>
<ul>
<li>TopicPartition partition<br>分区信息，返回结果都是以分区为纬度。</li>
<li>long fetchedOffset<br>本次拉取的开始偏移量。</li>
<li>FetchResponse.PartitionData partitionData<br>返回的分区数据。</li>
<li>FetchResponseMetricAgregator metricAggregator<br>统计指标相关。</li>
<li>short responseVersion<br>broker 端的版本号。</li>
</ul>
<p>分区的数据是使用 PartitionData 来进行封装的。我们也来简单的了解一下其内部数据结果。</p>
<ul>
<li>Errors error<br>分区拉取的相应结果，Errors.NONE 表示请求成功。</li>
<li>long highWatermark<br>broker 端关于该分区的高水位线，即小于该偏移量的消息对于消费端是可见的。</li>
<li>long lastStableOffset<br>分区中小于该偏移量的消息的事务状态已得到确认，要么是已提交，要么是已回滚，与事务相关，后面会专门探讨。</li>
<li>List&lt; AbortedTransaction&gt; abortedTransactions<br>已拒绝的事物。</li>
<li>T records<br>分区数据，是 BaseRecords 的子类。</li>
</ul>
<p>2、parseCompletedFetch 详解</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br><span class="line">57</span><br><span class="line">58</span><br><span class="line">59</span><br><span class="line">60</span><br><span class="line">61</span><br><span class="line">62</span><br><span class="line">63</span><br><span class="line">64</span><br><span class="line">65</span><br><span class="line">66</span><br><span class="line">67</span><br><span class="line">68</span><br><span class="line">69</span><br><span class="line">70</span><br><span class="line">71</span><br><span class="line">72</span><br><span class="line">73</span><br><span class="line">74</span><br><span class="line">75</span><br><span class="line">76</span><br><span class="line">77</span><br><span class="line">78</span><br><span class="line">79</span><br><span class="line">80</span><br><span class="line">81</span><br><span class="line">82</span><br><span class="line">83</span><br><span class="line">84</span><br><span class="line">85</span><br><span class="line">86</span><br><span class="line">87</span><br><span class="line">88</span><br><span class="line">89</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> PartitionRecords <span class="title">parseCompletedFetch</span><span class="params">(CompletedFetch completedFetch)</span> </span>&#123;</span><br><span class="line">    TopicPartition tp = completedFetch.partition;</span><br><span class="line">    FetchResponse.PartitionData&lt;Records&gt; partition = completedFetch.partitionData;</span><br><span class="line">    <span class="keyword">long</span> fetchOffset = completedFetch.fetchedOffset;</span><br><span class="line">    PartitionRecords partitionRecords = <span class="keyword">null</span>;</span><br><span class="line">    Errors error = partition.error;</span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        <span class="keyword">if</span> (!subscriptions.isFetchable(tp)) &#123;       <span class="comment">// @1</span></span><br><span class="line">            log.debug(<span class="string">&quot;Ignoring fetched records for partition &#123;&#125; since it is no longer fetchable&quot;</span>, tp);</span><br><span class="line">        &#125; <span class="keyword">else</span> <span class="keyword">if</span> (error == Errors.NONE) &#123;         <span class="comment">// @2</span></span><br><span class="line">            Long position = subscriptions.position(tp);</span><br><span class="line">            <span class="keyword">if</span> (position == <span class="keyword">null</span> || position != fetchOffset) &#123;    <span class="comment">// @21</span></span><br><span class="line">                log.debug(<span class="string">&quot;Discarding stale fetch response for partition &#123;&#125; since its offset &#123;&#125; does not match &quot;</span> +</span><br><span class="line">                            <span class="string">&quot;the expected offset &#123;&#125;&quot;</span>, tp, fetchOffset, position);</span><br><span class="line">                <span class="keyword">return</span> <span class="keyword">null</span>;</span><br><span class="line">            &#125;</span><br><span class="line">            log.trace(<span class="string">&quot;Preparing to read &#123;&#125; bytes of data for partition &#123;&#125; with offset &#123;&#125;&quot;</span>,</span><br><span class="line">                        partition.records.sizeInBytes(), tp, position);</span><br><span class="line">            Iterator&lt;? extends RecordBatch&gt; batches = partition.records.batches().iterator();   <span class="comment">// @22</span></span><br><span class="line">            partitionRecords = <span class="keyword">new</span> PartitionRecords(tp, completedFetch, batches);</span><br><span class="line"></span><br><span class="line">            <span class="keyword">if</span> (!batches.hasNext() &amp;&amp; partition.records.sizeInBytes() &gt; <span class="number">0</span>) &#123;   <span class="comment">// @23</span></span><br><span class="line">                <span class="keyword">if</span> (completedFetch.responseVersion &lt; <span class="number">3</span>) &#123;</span><br><span class="line">                    Map&lt;TopicPartition, Long&gt; recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);</span><br><span class="line">                    <span class="keyword">throw</span> <span class="keyword">new</span> RecordTooLargeException(<span class="string">&quot;There are some messages at [Partition=Offset]: &quot;</span> +</span><br><span class="line">                                recordTooLargePartitions + <span class="string">&quot; whose size is larger than the fetch size &quot;</span> + <span class="keyword">this</span>.fetchSize +</span><br><span class="line">                                <span class="string">&quot; and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or &quot;</span> +</span><br><span class="line">                                <span class="string">&quot;newer to avoid this issue. Alternately, increase the fetch size on the client (using &quot;</span> +</span><br><span class="line">                                ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + <span class="string">&quot;)&quot;</span>,</span><br><span class="line">                                recordTooLargePartitions);</span><br><span class="line">                &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">                    <span class="comment">// This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)</span></span><br><span class="line">                    <span class="keyword">throw</span> <span class="keyword">new</span> KafkaException(<span class="string">&quot;Failed to make progress reading messages at &quot;</span> + tp + <span class="string">&quot;=&quot;</span> +</span><br><span class="line">                            fetchOffset + <span class="string">&quot;. Received a non-empty fetch response from the server, but no &quot;</span> +</span><br><span class="line">                            <span class="string">&quot;complete records were found.&quot;</span>);</span><br><span class="line">               &#125;</span><br><span class="line">            &#125;</span><br><span class="line"></span><br><span class="line">            <span class="keyword">if</span> (partition.highWatermark &gt;= <span class="number">0</span>) &#123;   <span class="comment">// @24</span></span><br><span class="line">                log.trace(<span class="string">&quot;Updating high watermark for partition &#123;&#125; to &#123;&#125;&quot;</span>, tp, partition.highWatermark);</span><br><span class="line">                subscriptions.updateHighWatermark(tp, partition.highWatermark);</span><br><span class="line">            &#125;</span><br><span class="line"></span><br><span class="line">            <span class="keyword">if</span> (partition.logStartOffset &gt;= <span class="number">0</span>) &#123;    <span class="comment">// @25</span></span><br><span class="line">                log.trace(<span class="string">&quot;Updating log start offset for partition &#123;&#125; to &#123;&#125;&quot;</span>, tp, partition.logStartOffset);</span><br><span class="line">                    subscriptions.updateLogStartOffset(tp, partition.logStartOffset);</span><br><span class="line">            &#125;</span><br><span class="line"></span><br><span class="line">            <span class="keyword">if</span> (partition.lastStableOffset &gt;= <span class="number">0</span>) &#123; <span class="comment">// @26</span></span><br><span class="line">                log.trace(<span class="string">&quot;Updating last stable offset for partition &#123;&#125; to &#123;&#125;&quot;</span>, tp, partition.lastStableOffset);</span><br><span class="line">                    subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);</span><br><span class="line">            &#125;</span><br><span class="line">        &#125; <span class="keyword">else</span> <span class="keyword">if</span> (error == Errors.NOT_LEADER_FOR_PARTITION ||</span><br><span class="line">                       error == Errors.REPLICA_NOT_AVAILABLE ||</span><br><span class="line">                       error == Errors.KAFKA_STORAGE_ERROR) &#123;                       <span class="comment">// @3</span></span><br><span class="line">                log.debug(<span class="string">&quot;Error in fetch for partition &#123;&#125;: &#123;&#125;&quot;</span>, tp, error.exceptionName());</span><br><span class="line">            <span class="keyword">this</span>.metadata.requestUpdate();</span><br><span class="line">        &#125; <span class="keyword">else</span> <span class="keyword">if</span> (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) &#123;          <span class="comment">// @4</span></span><br><span class="line">            log.warn(<span class="string">&quot;Received unknown topic or partition error in fetch for partition &#123;&#125;&quot;</span>, tp);</span><br><span class="line">            <span class="keyword">this</span>.metadata.requestUpdate();</span><br><span class="line">        &#125; <span class="keyword">else</span> <span class="keyword">if</span> (error == Errors.OFFSET_OUT_OF_RANGE) &#123;                        <span class="comment">// @5</span></span><br><span class="line">            <span class="keyword">if</span> (fetchOffset != subscriptions.position(tp)) &#123;</span><br><span class="line">                log.debug(<span class="string">&quot;Discarding stale fetch response for partition &#123;&#125; since the fetched offset &#123;&#125; &quot;</span> +</span><br><span class="line">                            <span class="string">&quot;does not match the current offset &#123;&#125;&quot;</span>, tp, fetchOffset, subscriptions.position(tp));</span><br><span class="line">            &#125; <span class="keyword">else</span> <span class="keyword">if</span> (subscriptions.hasDefaultOffsetResetPolicy()) &#123;</span><br><span class="line">                log.info(<span class="string">&quot;Fetch offset &#123;&#125; is out of range for partition &#123;&#125;, resetting offset&quot;</span>, fetchOffset, tp);</span><br><span class="line">                    subscriptions.requestOffsetReset(tp);</span><br><span class="line">            &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">                <span class="keyword">throw</span> <span class="keyword">new</span> OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));</span><br><span class="line">            &#125;</span><br><span class="line">        &#125; <span class="keyword">else</span> <span class="keyword">if</span> (error == Errors.TOPIC_AUTHORIZATION_FAILED) &#123;             <span class="comment">// @6</span></span><br><span class="line">            log.warn(<span class="string">&quot;Not authorized to read from topic &#123;&#125;.&quot;</span>, tp.topic());</span><br><span class="line">                <span class="keyword">throw</span> <span class="keyword">new</span> TopicAuthorizationException(Collections.singleton(tp.topic()));</span><br><span class="line">        &#125; <span class="keyword">else</span> <span class="keyword">if</span> (error == Errors.UNKNOWN_SERVER_ERROR) &#123;                </span><br><span class="line">            log.warn(<span class="string">&quot;Unknown error fetching data for topic-partition &#123;&#125;&quot;</span>, tp);</span><br><span class="line">        &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">            <span class="keyword">throw</span> <span class="keyword">new</span> IllegalStateException(<span class="string">&quot;Unexpected error code &quot;</span> + error.code() + <span class="string">&quot; while fetching data&quot;</span>);</span><br><span class="line">        &#125;</span><br><span class="line">    &#125; <span class="keyword">finally</span> &#123;   <span class="comment">// @7</span></span><br><span class="line">        <span class="keyword">if</span> (partitionRecords == <span class="keyword">null</span>)</span><br><span class="line">            completedFetch.metricAggregator.record(tp, <span class="number">0</span>, <span class="number">0</span>);</span><br><span class="line"></span><br><span class="line">        <span class="keyword">if</span> (error != Errors.NONE)</span><br><span class="line">           <span class="comment">// we move the partition to the end if there was an error. This way, it&#x27;s more likely that partitions for</span></span><br><span class="line">           <span class="comment">// the same topic can remain together (allowing for more efficient serialization).</span></span><br><span class="line">           subscriptions.movePartitionToEnd(tp);</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> partitionRecords;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>上面的代码虽然比较长，其实整体还是比较简单，只是需要针对各种异常处理，打印对应的日志，接下来详细介绍该方法的实现关键点。</p>
<p>代码@1：判断该分区是否可拉取，如果不可拉取，则忽略这批拉取的消息，判断是可拉取的要点如下：</p>
<ul>
<li>当前消费者负载的队列包含该分区。</li>
<li>当前消费者针对该队列并没有被用户设置为暂停（消费端限流）。</li>
<li>当前消费者针对该队列有有效的拉取偏移量。</li>
</ul>
<p>代码@2：该分支是处理正常返回的相关逻辑。其关键点如下：</p>
<ul>
<li>如果当前针对该队列的消费位移 与 发起 fetch 请求时的 偏移量不一致，则认为本次拉取非法，直接返回 null ，如代码@21。</li>
<li>从返回结构中获取本次拉取的数据，使用数据迭代器，其基本数据单位为 RecordBatch，即一个发送批次，如代码@22。</li>
<li>如果返回结果中没有包含至少一个批次的消息，但是 sizeInBytes 又大于0，则直接抛出错误，根据服务端的版本，其错误信息有所不同，但主要是建议我们如何处理，如果 broker 的版本低于 0.10.1.0，则建议升级 broker 版本，或增大客户端的 fetch size，这种错误是因为一个批次的消息已经超过了本次拉取允许的最大拉取消息大小，如代码@23。</li>
<li>依次更新消费者本地关于该队列的订阅缓存信息的 highWatermark、logStartOffset、lastStableOffset。</li>
</ul>
<p>从代码@3到@8 是多种异常信息的处理。<br>代码@3：如果出现如下3种错误码，则使用 debug 打印错误日志，并且向服务端请求元数据并更新本地缓存。</p>
<ul>
<li>NOT_LEADER_FOR_PARTITION<br>请求的节点上不是该分区的 Leader 分区。</li>
<li>REPLICA_NOT_AVAILABLE<br>该分区副本之间无法复制</li>
<li>KAFKA_STORAGE_ERROR<br>存储异常。</li>
</ul>
<p>Kafka 认为上述错误是可恢复的，而且对消费不会造成太大影响，故只是用 debug 打印日志，然后更新本地缓存即可。</p>
<p>代码@4：如果出现 UNKNOWN_TOPIC_OR_PARTITION 未知主题与分区时，则使用 warn 级别输出错误日志，并更新元数据。</p>
<p>代码@5：针对 OFFSET_OUT_OF_RANGE 偏移量超过范围异常的处理逻辑，其实现关键点如下：</p>
<ul>
<li>如果此次拉取的开始偏移量与消费者本地缓存的偏移量不一致，则丢弃，说明该消息已过期，打印错误日志。</li>
<li>如果此次拉取的开始偏移量与消费者本地缓存的偏移量一致，说明此时的偏移量非法，如果有配置重置偏移量策略，则使用重置偏移量，否则抛出        OffsetOutOfRangeException 错误。</li>
</ul>
<p>代码@6：如果是 TOPIC_AUTHORIZATION_FAILED 没有权限(ACL)则抛出异常。</p>
<p>代码@7：如果本次拉取的结果不是NONE(成功)，并且是可恢复的，将该队列的订阅关系移动到消费者缓存列表的末尾。如果成功，则返回拉取到的分区数据，其封装对象为 PartitionRecords。</p>
<p>接下来我们再来看看 2.1.1 fetchedRecords 中的另外一个核心方法。</p>
<h6 id="2-2-1-2-fetchRecords"><a href="#2-2-1-2-fetchRecords" class="headerlink" title="2.2.1.2 fetchRecords()"></a>2.2.1.2 fetchRecords()</h6><p>在介绍该方法之前同样先来看一下参数 PartitionRecords 的内部结构。</p>
<p>1、PartitionRecords 类图<br><img src="https://img-blog.csdnimg.cn/20191208194302768.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>主要的核心属性如下：</p>
<ul>
<li>TopicPartition partition<br>分区信息。</li>
<li>CompletedFetch completedFetch<br>Fetch请求完成结果</li>
<li>Iterator&lt;? extends RecordBatch&gt; batches<br>本次 Fetch 操作获取的结果集。</li>
<li>Set&lt; Long&gt; abortedProducerIds<br>与事物相关，后续会专门的章节详细介绍。</li>
<li>PriorityQueue&lt;FetchResponse.AbortedTransaction&gt; abortedTransactions<br>与事物相关，后续会专门的章节详细介绍。</li>
<li>int recordsRead<br>已读取的记录条数。</li>
<li>int bytesRead<br>已读取的记录字节数。</li>
<li>RecordBatch currentBatch<br>当前遍历的批次。</li>
<li>Record lastRecord<br>该迭代器最后一条消息。</li>
<li>long nextFetchOffset<br>下次待拉取的偏移量。</li>
</ul>
<p>2、fetchRecords 详解</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">private</span> List&lt;ConsumerRecord&lt;K, V&gt;&gt; fetchRecords(PartitionRecords partitionRecords, <span class="keyword">int</span> maxRecords) &#123;</span><br><span class="line">    <span class="keyword">if</span> (!subscriptions.isAssigned(partitionRecords.partition)) &#123;   <span class="comment">// @1</span></span><br><span class="line">            <span class="comment">// this can happen when a rebalance happened before fetched records are returned to the consumer&#x27;s poll call</span></span><br><span class="line">        log.debug(<span class="string">&quot;Not returning fetched records for partition &#123;&#125; since it is no longer assigned&quot;</span>,</span><br><span class="line">                    partitionRecords.partition);</span><br><span class="line">    &#125; <span class="keyword">else</span> <span class="keyword">if</span> (!subscriptions.isFetchable(partitionRecords.partition)) &#123; <span class="comment">// @2</span></span><br><span class="line">        <span class="comment">// this can happen when a partition is paused before fetched records are returned to the consumer&#x27;s</span></span><br><span class="line">        <span class="comment">// poll call or if the offset is being reset</span></span><br><span class="line">        log.debug(<span class="string">&quot;Not returning fetched records for assigned partition &#123;&#125; since it is no longer fetchable&quot;</span>,</span><br><span class="line">                    partitionRecords.partition);</span><br><span class="line">    &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">        <span class="keyword">long</span> position = subscriptions.position(partitionRecords.partition);       <span class="comment">// @3</span></span><br><span class="line">        <span class="keyword">if</span> (partitionRecords.nextFetchOffset == position) &#123;      <span class="comment">// @4</span></span><br><span class="line">            List&lt;ConsumerRecord&lt;K, V&gt;&gt; partRecords = partitionRecords.fetchRecords(maxRecords);</span><br><span class="line">            <span class="keyword">long</span> nextOffset = partitionRecords.nextFetchOffset;</span><br><span class="line">            log.trace(<span class="string">&quot;Returning fetched records at offset &#123;&#125; for assigned partition &#123;&#125; and update &quot;</span> +</span><br><span class="line">                        <span class="string">&quot;position to &#123;&#125;&quot;</span>, position, partitionRecords.partition, nextOffset);</span><br><span class="line">            subscriptions.position(partitionRecords.partition, nextOffset);</span><br><span class="line"></span><br><span class="line">            Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel);  </span><br><span class="line">            <span class="keyword">if</span> (partitionLag != <span class="keyword">null</span>)</span><br><span class="line">                <span class="keyword">this</span>.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);</span><br><span class="line"></span><br><span class="line">            Long lead = subscriptions.partitionLead(partitionRecords.partition);</span><br><span class="line">            <span class="keyword">if</span> (lead != <span class="keyword">null</span>) &#123;</span><br><span class="line">                <span class="keyword">this</span>.sensors.recordPartitionLead(partitionRecords.partition, lead);</span><br><span class="line">            &#125;</span><br><span class="line"></span><br><span class="line">            <span class="keyword">return</span> partRecords;</span><br><span class="line">        &#125; <span class="keyword">else</span> &#123;   <span class="comment">// @5</span></span><br><span class="line">            <span class="comment">// these records aren&#x27;t next in line based on the last consumed position, ignore them</span></span><br><span class="line">            <span class="comment">// they must be from an obsolete request</span></span><br><span class="line">            log.debug(<span class="string">&quot;Ignoring fetched records for &#123;&#125; at offset &#123;&#125; since the current position is &#123;&#125;&quot;</span>,</span><br><span class="line">                        partitionRecords.partition, partitionRecords.nextFetchOffset, position);</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">    partitionRecords.drain();</span><br><span class="line">    <span class="keyword">return</span> emptyList();</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：从 PartitionRecords 中提取消息之前，再次判断订阅消息中是否包含当前分区，如果不包含，则使用 debug 打印日志，很有可能是发生了重平衡。</p>
<p>代码@2：是否允许拉取，如果用户主动暂停消费，则忽略本次拉取的消息。备注：Kafka 消费端如果消费太快，可以进行限流。</p>
<p>代码@3：从本地消费者缓存中获取该队列已消费的偏移量，在发送拉取消息时，就是从该偏移量开始拉取的。</p>
<p>代码@4：如果本地缓存已消费偏移量与从服务端拉回的起始偏移量相等的话，则认为是一个有效拉取，否则则认为是一个过期的拉取，该批消息已被消费，见代码@5。如果是一个有效请求，则使用 sensors 收集统计信息，并返回拉取到的消息， 返回结果被封装在 List&lt;ConsumerRecord&lt;K, V&gt;&gt; 。</p>
<h5 id="2-2-2-sendFetches"><a href="#2-2-2-sendFetches" class="headerlink" title="2.2.2 sendFetches"></a>2.2.2 sendFetches</h5><p>“发送” fetch 请求，注意这里并不会触发网络操作，而是组装拉取请求，将其放入网络缓存区。</p>
<p>Fetcher#sendFetches</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br><span class="line">57</span><br><span class="line">58</span><br><span class="line">59</span><br><span class="line">60</span><br><span class="line">61</span><br><span class="line">62</span><br><span class="line">63</span><br><span class="line">64</span><br><span class="line">65</span><br><span class="line">66</span><br><span class="line">67</span><br><span class="line">68</span><br><span class="line">69</span><br><span class="line">70</span><br><span class="line">71</span><br><span class="line">72</span><br><span class="line">73</span><br><span class="line">74</span><br><span class="line">75</span><br><span class="line">76</span><br><span class="line">77</span><br><span class="line">78</span><br><span class="line">79</span><br><span class="line">80</span><br><span class="line">81</span><br><span class="line">82</span><br><span class="line">83</span><br><span class="line">84</span><br><span class="line">85</span><br><span class="line">86</span><br><span class="line">87</span><br><span class="line">88</span><br><span class="line">89</span><br><span class="line">90</span><br><span class="line">91</span><br><span class="line">92</span><br><span class="line">93</span><br><span class="line">94</span><br><span class="line">95</span><br><span class="line">96</span><br><span class="line">97</span><br><span class="line">98</span><br><span class="line">99</span><br><span class="line">100</span><br><span class="line">101</span><br><span class="line">102</span><br><span class="line">103</span><br><span class="line">104</span><br><span class="line">105</span><br><span class="line">106</span><br><span class="line">107</span><br><span class="line">108</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">synchronized</span> <span class="keyword">int</span> <span class="title">sendFetches</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    Map&lt;Node, FetchSessionHandler.FetchRequestData&gt; fetchRequestMap = prepareFetchRequests();  <span class="comment">// @1</span></span><br><span class="line">    <span class="keyword">for</span> (Map.Entry&lt;Node, FetchSessionHandler.FetchRequestData&gt; entry : fetchRequestMap.entrySet()) &#123;   <span class="comment">// @2</span></span><br><span class="line">        <span class="keyword">final</span> Node fetchTarget = entry.getKey();</span><br><span class="line">        <span class="keyword">final</span> FetchSessionHandler.FetchRequestData data = entry.getValue();</span><br><span class="line">        <span class="keyword">final</span> FetchRequest.Builder request = FetchRequest.Builder</span><br><span class="line">            .forConsumer(<span class="keyword">this</span>.maxWaitMs, <span class="keyword">this</span>.minBytes, data.toSend())</span><br><span class="line">            .isolationLevel(isolationLevel)</span><br><span class="line">            .setMaxBytes(<span class="keyword">this</span>.maxBytes)</span><br><span class="line">            .metadata(data.metadata())</span><br><span class="line">            .toForget(data.toForget());   <span class="comment">// @3</span></span><br><span class="line"> </span><br><span class="line">        client.send(fetchTarget, request)    <span class="comment">// @4</span></span><br><span class="line">            .addListener(<span class="keyword">new</span> RequestFutureListener&lt;ClientResponse&gt;() &#123;</span><br><span class="line">                <span class="meta">@Override</span></span><br><span class="line">                <span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">onSuccess</span><span class="params">(ClientResponse resp)</span> </span>&#123;  <span class="comment">// @5</span></span><br><span class="line">                    <span class="keyword">synchronized</span> (Fetcher.<span class="keyword">this</span>) &#123;</span><br><span class="line">                        <span class="meta">@SuppressWarnings(&quot;unchecked&quot;)</span></span><br><span class="line">                        FetchResponse&lt;Records&gt; response = (FetchResponse&lt;Records&gt;) resp.responseBody();</span><br><span class="line">                        FetchSessionHandler handler = sessionHandler(fetchTarget.id());</span><br><span class="line">                        <span class="keyword">if</span> (handler == <span class="keyword">null</span>) &#123;</span><br><span class="line">                            log.error(<span class="string">&quot;Unable to find FetchSessionHandler for node &#123;&#125;. Ignoring fetch response.&quot;</span>,</span><br><span class="line">                                fetchTarget.id());</span><br><span class="line">                            <span class="keyword">return</span>;</span><br><span class="line">                        &#125;</span><br><span class="line">                        <span class="keyword">if</span> (!handler.handleResponse(response)) &#123;</span><br><span class="line">                            <span class="keyword">return</span>;</span><br><span class="line">                        &#125;</span><br><span class="line"></span><br><span class="line">                        Set&lt;TopicPartition&gt; partitions = <span class="keyword">new</span> HashSet&lt;&gt;(response.responseData().keySet());</span><br><span class="line">                        FetchResponseMetricAggregator metricAggregator = <span class="keyword">new</span> FetchResponseMetricAggregator(sensors, partitions);</span><br><span class="line">                        <span class="keyword">for</span> (Map.Entry&lt;TopicPartition, FetchResponse.PartitionData&lt;Records&gt;&gt; entry : </span><br><span class="line">                                 response.responseData().entrySet()) &#123;</span><br><span class="line">                            TopicPartition partition = entry.getKey();</span><br><span class="line">                            <span class="keyword">long</span> fetchOffset = data.sessionPartitions().get(partition).fetchOffset;</span><br><span class="line">                            FetchResponse.PartitionData&lt;Records&gt; fetchData = entry.getValue();</span><br><span class="line">                            completedFetches.add(<span class="keyword">new</span> CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,</span><br><span class="line">                                resp.requestHeader().apiVersion()));</span><br><span class="line">                            &#125;    <span class="comment">// @6</span></span><br><span class="line"></span><br><span class="line">                            sensors.fetchLatency.record(resp.requestLatencyMs());</span><br><span class="line">                        &#125;</span><br><span class="line">                  &#125;</span><br><span class="line">                  <span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">onFailure</span><span class="params">(RuntimeException e)</span> </span>&#123;  <span class="comment">// @7</span></span><br><span class="line">                    <span class="keyword">synchronized</span> (Fetcher.<span class="keyword">this</span>) &#123;</span><br><span class="line">                        FetchSessionHandler handler = sessionHandler(fetchTarget.id());</span><br><span class="line">                        <span class="keyword">if</span> (handler != <span class="keyword">null</span>) &#123;</span><br><span class="line">                            handler.handleError(e);</span><br><span class="line">                        &#125;</span><br><span class="line">                    &#125;</span><br><span class="line">                  &#125;</span><br><span class="line">        &#125;);</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> fetchRequestMap.size();</span><br><span class="line">&#125;</span><br><span class="line">​~~~java</span><br><span class="line">上面的方法比较长，其实现的关键点如下：</span><br><span class="line">代码@<span class="number">1</span>：通过调用 Fetcher 的 prepareFetchRequests 方法按节点组装拉取请求，将在后面详细介绍。</span><br><span class="line"></span><br><span class="line">代码@<span class="number">2</span>：遍历上面的待发请求，进一步组装请求。下面就是分节点发送拉取请求。</span><br><span class="line"></span><br><span class="line">代码@<span class="number">3</span>：构建 FetchRequest 拉取请求对象。</span><br><span class="line"></span><br><span class="line">代码@<span class="number">4</span>：调用 NetworkClient 的 send 方法将其发送到发送缓存区，本文不会详细介绍网络方面的实现，但下文会截图说明拉取请求发送缓存区的一个关键点。</span><br><span class="line"></span><br><span class="line">代码@<span class="number">5</span>：这里会注册事件监听器，当消息从 broker 拉取到本地后触发回调，即消息拉取请求收到返回结果后会将返回结果放入到completedFetches 中（代码@<span class="number">6</span>），这就和上文消息拉取时 Fetcher 的 fetchedRecords 方法形成闭环。</span><br><span class="line">代码@<span class="number">7</span>：消息拉取一次处理。</span><br><span class="line"></span><br><span class="line">接下来详细介绍 prepareFetchRequests 方法。</span><br><span class="line"></span><br><span class="line">###### 2.2.2.1 Fetcher prepareFetchRequests 方法详解</span><br><span class="line">​~~~java</span><br><span class="line"><span class="keyword">private</span> Map&lt;Node, FetchSessionHandler.FetchRequestData&gt; prepareFetchRequests() &#123;</span><br><span class="line">    Map&lt;Node, FetchSessionHandler.Builder&gt; fetchable = <span class="keyword">new</span> LinkedHashMap&lt;&gt;();  </span><br><span class="line">    <span class="keyword">for</span> (TopicPartition partition : fetchablePartitions()) &#123;    <span class="comment">// @1</span></span><br><span class="line">        Node node = metadata.partitionInfoIfCurrent(partition).map(PartitionInfo::leader).orElse(<span class="keyword">null</span>);  <span class="comment">// @2</span></span><br><span class="line">        <span class="keyword">if</span> (node == <span class="keyword">null</span>) &#123;    <span class="comment">// @3</span></span><br><span class="line">            metadata.requestUpdate();</span><br><span class="line">        &#125; <span class="keyword">else</span> <span class="keyword">if</span> (client.isUnavailable(node)) &#123;   <span class="comment">// @4</span></span><br><span class="line">           client.maybeThrowAuthFailure(node);</span><br><span class="line">           log.trace(<span class="string">&quot;Skipping fetch for partition &#123;&#125; because node &#123;&#125; is awaiting reconnect backoff&quot;</span>, partition, node);</span><br><span class="line">        &#125; <span class="keyword">else</span> <span class="keyword">if</span> (client.hasPendingRequests(node)) &#123;   <span class="comment">// @5</span></span><br><span class="line">            log.trace(<span class="string">&quot;Skipping fetch for partition &#123;&#125; because there is an in-flight request to &#123;&#125;&quot;</span>, partition, node);</span><br><span class="line">        &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">            <span class="comment">// if there is a leader and no in-flight requests, issue a new fetch</span></span><br><span class="line">            FetchSessionHandler.Builder builder = fetchable.get(node);    <span class="comment">// @7</span></span><br><span class="line">            <span class="keyword">if</span> (builder == <span class="keyword">null</span>) &#123;</span><br><span class="line">                FetchSessionHandler handler = sessionHandler(node.id());</span><br><span class="line">                <span class="keyword">if</span> (handler == <span class="keyword">null</span>) &#123;</span><br><span class="line">                    handler = <span class="keyword">new</span> FetchSessionHandler(logContext, node.id());</span><br><span class="line">                    sessionHandlers.put(node.id(), handler);</span><br><span class="line">                &#125;</span><br><span class="line">                builder = handler.newBuilder();</span><br><span class="line">                fetchable.put(node, builder);</span><br><span class="line">            &#125;</span><br><span class="line">            <span class="keyword">long</span> position = <span class="keyword">this</span>.subscriptions.position(partition);</span><br><span class="line">            builder.add(partition, <span class="keyword">new</span> FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET,</span><br><span class="line">            <span class="keyword">this</span>.fetchSize, Optional.empty()));</span><br><span class="line">            log.debug(<span class="string">&quot;Added &#123;&#125; fetch request for partition &#123;&#125; at offset &#123;&#125; to node &#123;&#125;&quot;</span>, isolationLevel,</span><br><span class="line">                    partition, position, node);</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">    Map&lt;Node, FetchSessionHandler.FetchRequestData&gt; reqs = <span class="keyword">new</span> LinkedHashMap&lt;&gt;();  </span><br><span class="line">    <span class="keyword">for</span> (Map.Entry&lt;Node, FetchSessionHandler.Builder&gt; entry : fetchable.entrySet()) &#123;</span><br><span class="line">        reqs.put(entry.getKey(), entry.getValue().build());</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> reqs;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：首先通过调用 fetchablePartitions() 获取可发起拉取任务的分区信息，下文简单介绍一下。</p>
<p>代码@2：如果该分区在客户端本地缓存中获取该分区的 Leader 节点信息。</p>
<p>代码@3：如果其 Leader 节点信息为空，则发起更新元数据请求，本次拉取任务将不会包含该分区。</p>
<p>代码@4：如果客户端与该分区的 Leader 连接为完成，如果是因为权限的原因则抛出ACL相关异常，否则打印日志，本次拉取请求不会包含该分区。</p>
<p>代码@5：判断该节点是否有挂起的拉取请求，即发送缓存区中是待发送的请求,如果有，本次将不会被拉取。</p>
<p>代码@6：构建拉取请求，分节点组织请求。</p>
<h6 id="2-2-2-2-NetworkClient-send-方法关键点"><a href="#2-2-2-2-NetworkClient-send-方法关键点" class="headerlink" title="2.2.2.2 NetworkClient send 方法关键点"></a>2.2.2.2 NetworkClient send 方法关键点</h6><p><img src="https://img-blog.csdnimg.cn/2019120819462738.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>NetworkClient 的 send 方法只是将其放入 unsent 中。<br><img src="https://img-blog.csdnimg.cn/20191208194637842.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>与上文的 client.hasPendingRequests(node) 方法遥相呼应。</p>
<p>3、总结<br>上面的源码分析有点长，也有点枯燥，我们还是画一张流程图来进行总结。<br><img src="https://img-blog.csdnimg.cn/20191208194720402.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br><img src="https://img-blog.csdnimg.cn/20191208194726675.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>Kafka 的消息拉取流程还是比较复杂的，后面会基于上述流程，重点进行拆解，例如消费进度提交，负载队列重平衡等等。</p>
</div>

			<script src="https://my.openwrite.cn/js/readmore.js" type="text/javascript"></script>
			<script>
			var isMobile = navigator.userAgent.match(/(phone|pad|pod|iPhone|iPod|ios|iPad|Android|Mobile|BlackBerry|IEMobile|MQQBrowser|JUC|Fennec|wOSBrowser|BrowserNG|WebOS|Symbian|Windows Phone)/i);
			if (!isMobile) {
			    var btw = new BTWPlugin();
			    btw.init({
			        "id": "vip-container",
			        "blogId": "18019-1573088808868-542",
			        "name": "中间件兴趣圈",
			        "qrcode": "https://img-blog.csdnimg.cn/20190314214003962.jpg",
			        "keyword": "more"
			    });
			}
			</script>
		
      
    </div>
    
    
    

    

    

    

    <footer class="post-footer">
      

      
      
      

      
        <div class="post-nav">
          <div class="post-nav-next post-nav-item">
            
              <a href="/posts/f81e58bb.html" rel="next" title="源码分析 Kafka 消息发送流程(文末附流程图)">
                <i class="fa fa-chevron-left"></i> 源码分析 Kafka 消息发送流程(文末附流程图)
              </a>
            
          </div>

          <span class="post-nav-divider"></span>

          <div class="post-nav-prev post-nav-item">
            
              <a href="/posts/d9d4c345.html" rel="prev" title="初始 Kafka Consumer 消费者">
                初始 Kafka Consumer 消费者 <i class="fa fa-chevron-right"></i>
              </a>
            
          </div>
        </div>
      

      
      
    </footer>
  </div>
  
  
  
  </article>



    <div class="post-spread">
      
    </div>
  </div>


          </div>
          


          

  



        </div>
        
          
  
  <div class="sidebar-toggle">
    <div class="sidebar-toggle-line-wrap">
      <span class="sidebar-toggle-line sidebar-toggle-line-first"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-middle"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-last"></span>
    </div>
  </div>

  <aside id="sidebar" class="sidebar">
    
    <div class="sidebar-inner">

      

      
        <ul class="sidebar-nav motion-element">
          <li class="sidebar-nav-toc sidebar-nav-active" data-target="post-toc-wrap">
            文章目录
          </li>
          <li class="sidebar-nav-overview" data-target="site-overview-wrap">
            站点概览
          </li>
        </ul>
      

      <section class="site-overview-wrap sidebar-panel">
        <div class="site-overview">
          <div class="site-author motion-element" itemprop="author" itemscope itemtype="http://schema.org/Person">
            
              <p class="site-author-name" itemprop="name"></p>
              <p class="site-description motion-element" itemprop="description"></p>
          </div>

          <nav class="site-state motion-element">

            
              <div class="site-state-item site-state-posts">
              
                <a href="/archives/">
              
                  <span class="site-state-item-count">139</span>
                  <span class="site-state-item-name">日志</span>
                </a>
              </div>
            

            
              
              
              <div class="site-state-item site-state-categories">
                <a href="/categories/index.html">
                  <span class="site-state-item-count">18</span>
                  <span class="site-state-item-name">分类</span>
                </a>
              </div>
            

            

          </nav>

          

          

          
          

          
          

          

        </div>
      </section>

      
      <!--noindex-->
        <section class="post-toc-wrap motion-element sidebar-panel sidebar-panel-active">
          <div class="post-toc">

            
              
            

            
              <div class="post-toc-content"><ol class="nav"><li class="nav-item nav-level-2"><a class="nav-link" href="#%EF%BC%91%E3%80%81KafkaConsumer-poll-%E8%AF%A6%E8%A7%A3"><span class="nav-number">1.</span> <span class="nav-text">１、KafkaConsumer poll 详解</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#1-1-KafkaConsumer-updateAssignmentMetadataIfNeeded-%E8%AF%A6%E8%A7%A3"><span class="nav-number">1.0.1.</span> <span class="nav-text">1.1 KafkaConsumer updateAssignmentMetadataIfNeeded 详解</span></a><ol class="nav-child"><li class="nav-item nav-level-5"><a class="nav-link" href="#1-1-1-ConsumerCoordinator-poll"><span class="nav-number">1.0.1.1.</span> <span class="nav-text">1.1.1 ConsumerCoordinator#poll</span></a></li><li class="nav-item nav-level-5"><a class="nav-link" href="#1-1-2-updateFetchPositions-%E8%AF%A6%E8%A7%A3"><span class="nav-number">1.0.1.2.</span> <span class="nav-text">1.1.2 updateFetchPositions 详解</span></a></li></ol></li><li class="nav-item nav-level-4"><a class="nav-link" href="#1-2-%E6%B6%88%E6%81%AF%E6%8B%89%E5%8F%96"><span class="nav-number">1.0.2.</span> <span class="nav-text">1.2 消息拉取</span></a></li></ol></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#2%E3%80%81Fetcher-%E7%B1%BB%E8%AF%A6%E8%A7%A3"><span class="nav-number">2.</span> <span class="nav-text">2、Fetcher 类详解</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#2-1-%E7%B1%BB%E5%9B%BE"><span class="nav-number">2.0.1.</span> <span class="nav-text">2.1 类图</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#2-2-Fetcher-%E6%A0%B8%E5%BF%83%E6%96%B9%E6%B3%95"><span class="nav-number">2.0.2.</span> <span class="nav-text">2.2 Fetcher 核心方法</span></a><ol class="nav-child"><li class="nav-item nav-level-5"><a class="nav-link" href="#2-2-1-Fetcher-fetchedRecords"><span class="nav-number">2.0.2.1.</span> <span class="nav-text">2.2.1 Fetcher#fetchedRecords</span></a><ol class="nav-child"><li class="nav-item nav-level-6"><a class="nav-link" href="#2-2-1-1-Fetcher-parseCompletedFetch"><span class="nav-number">2.0.2.1.1.</span> <span class="nav-text">2.2.1.1 Fetcher#parseCompletedFetch</span></a></li><li class="nav-item nav-level-6"><a class="nav-link" href="#2-2-1-2-fetchRecords"><span class="nav-number">2.0.2.1.2.</span> <span class="nav-text">2.2.1.2 fetchRecords()</span></a></li></ol></li><li class="nav-item nav-level-5"><a class="nav-link" href="#2-2-2-sendFetches"><span class="nav-number">2.0.2.2.</span> <span class="nav-text">2.2.2 sendFetches</span></a><ol class="nav-child"><li class="nav-item nav-level-6"><a class="nav-link" href="#2-2-2-2-NetworkClient-send-%E6%96%B9%E6%B3%95%E5%85%B3%E9%94%AE%E7%82%B9"><span class="nav-number">2.0.2.2.1.</span> <span class="nav-text">2.2.2.2 NetworkClient send 方法关键点</span></a></li></ol></li></ol></li></ol></li></ol></li></ol></div>
            

          </div>
        </section>
      <!--/noindex-->
      

      

    </div>
  </aside>


        
      </div>
    </main>

    <footer id="footer" class="footer">
      <div class="footer-inner">
        <div class="copyright">&copy; <span itemprop="copyrightYear">2021</span>
  <span class="with-love">
    <i class="fa fa-user"></i>
  </span>
  <span class="author" itemprop="copyrightHolder">中间件兴趣圈</span>

  
</div>


  <div class="powered-by">由 <a class="theme-link" target="_blank" href="https://hexo.io">Hexo</a> 强力驱动</div>



  <span class="post-meta-divider">|</span>



  <div class="theme-info">主题 &mdash; <a class="theme-link" target="_blank" href="https://github.com/iissnan/hexo-theme-next">NexT.Muse</a> v5.1.4</div>




        
<div class="busuanzi-count">
  <script async src="https://dn-lbstatics.qbox.me/busuanzi/2.3/busuanzi.pure.mini.js"></script>

  
    <span class="site-uv">
      <i class="fa fa-user"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_uv"></span>
      
    </span>
  

  
    <span class="site-pv">
      <i class="fa fa-eye"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_pv"></span>
      
    </span>
  
</div>








        
      </div>
    </footer>

    
      <div class="back-to-top">
        <i class="fa fa-arrow-up"></i>
        
      </div>
    

    

  </div>

  

<script type="text/javascript">
  if (Object.prototype.toString.call(window.Promise) !== '[object Function]') {
    window.Promise = null;
  }
</script>









  












  
  
    <script type="text/javascript" src="/lib/jquery/index.js?v=2.1.3"></script>
  

  
  
    <script type="text/javascript" src="/lib/fastclick/lib/fastclick.min.js?v=1.0.6"></script>
  

  
  
    <script type="text/javascript" src="/lib/jquery_lazyload/jquery.lazyload.js?v=1.9.7"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.ui.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/fancybox/source/jquery.fancybox.pack.js?v=2.1.5"></script>
  


  


  <script type="text/javascript" src="/js/src/utils.js?v=5.1.4"></script>

  <script type="text/javascript" src="/js/src/motion.js?v=5.1.4"></script>



  
  

  
  <script type="text/javascript" src="/js/src/scrollspy.js?v=5.1.4"></script>
<script type="text/javascript" src="/js/src/post-details.js?v=5.1.4"></script>



  


  <script type="text/javascript" src="/js/src/bootstrap.js?v=5.1.4"></script>



  


  




	





  





  












  





  

  
  <script src="https://cdn1.lncld.net/static/js/av-core-mini-0.6.4.js"></script>
  <script>AV.initialize("NNEhOL0iOcflg8f1U3HUqiCq-gzGzoHsz", "7kSmkbbb3DktmHALlShDsBUF");</script>
  <script>
    function showTime(Counter) {
      var query = new AV.Query(Counter);
      var entries = [];
      var $visitors = $(".leancloud_visitors");

      $visitors.each(function () {
        entries.push( $(this).attr("id").trim() );
      });

      query.containedIn('url', entries);
      query.find()
        .done(function (results) {
          var COUNT_CONTAINER_REF = '.leancloud-visitors-count';

          if (results.length === 0) {
            $visitors.find(COUNT_CONTAINER_REF).text(0);
            return;
          }

          for (var i = 0; i < results.length; i++) {
            var item = results[i];
            var url = item.get('url');
            var time = item.get('time');
            var element = document.getElementById(url);

            $(element).find(COUNT_CONTAINER_REF).text(time);
          }
          for(var i = 0; i < entries.length; i++) {
            var url = entries[i];
            var element = document.getElementById(url);
            var countSpan = $(element).find(COUNT_CONTAINER_REF);
            if( countSpan.text() == '') {
              countSpan.text(0);
            }
          }
        })
        .fail(function (object, error) {
          console.log("Error: " + error.code + " " + error.message);
        });
    }

    function addCount(Counter) {
      var $visitors = $(".leancloud_visitors");
      var url = $visitors.attr('id').trim();
      var title = $visitors.attr('data-flag-title').trim();
      var query = new AV.Query(Counter);

      query.equalTo("url", url);
      query.find({
        success: function(results) {
          if (results.length > 0) {
            var counter = results[0];
            counter.fetchWhenSave(true);
            counter.increment("time");
            counter.save(null, {
              success: function(counter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(counter.get('time'));
              },
              error: function(counter, error) {
                console.log('Failed to save Visitor num, with error message: ' + error.message);
              }
            });
          } else {
            var newcounter = new Counter();
            /* Set ACL */
            var acl = new AV.ACL();
            acl.setPublicReadAccess(true);
            acl.setPublicWriteAccess(true);
            newcounter.setACL(acl);
            /* End Set ACL */
            newcounter.set("title", title);
            newcounter.set("url", url);
            newcounter.set("time", 1);
            newcounter.save(null, {
              success: function(newcounter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(newcounter.get('time'));
              },
              error: function(newcounter, error) {
                console.log('Failed to create');
              }
            });
          }
        },
        error: function(error) {
          console.log('Error:' + error.code + " " + error.message);
        }
      });
    }

    $(function() {
      var Counter = AV.Object.extend("Counter");
      if ($('.leancloud_visitors').length == 1) {
        addCount(Counter);
      } else if ($('.post-title-link').length > 1) {
        showTime(Counter);
      }
    });
  </script>



  

  

  
  

  

  

  

</body>
</html>
